-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38909][BUILD][CORE][YARN][FOLLOWUP] Make some code cleanup related to shuffle state db #37648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This reverts commit 2caffd1.
| .build(indexCacheLoader); | ||
| DBBackend dbBackend = null; | ||
| DBBackend dbBackend; | ||
| if (registeredExecutorFile != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this condition? I feel like it's useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... registeredExecutorFile may be null, for example, the following code path:
spark/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
Lines 81 to 92 in 68c47d5
| protected def newShuffleBlockHandler(conf: TransportConf): ExternalBlockHandler = { | |
| if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { | |
| val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND) | |
| val dBBackend = DBBackend.byName(shuffleDBName) | |
| logInfo(s"Configured ${config.SHUFFLE_SERVICE_DB_BACKEND.key} as $shuffleDBName " + | |
| s"and actually used value ${dBBackend.name()} ") | |
| new ExternalBlockHandler(conf, | |
| findRegisteredExecutorsDBFile(dBBackend.fileName(registeredExecutorsDB))) | |
| } else { | |
| new ExternalBlockHandler(conf, null) | |
| } | |
| } |
Lines 79 to 84 in 68c47d5
| public ExternalBlockHandler(TransportConf conf, File registeredExecutorFile) | |
| throws IOException { | |
| this(new OneForOneStreamManager(), | |
| new ExternalShuffleBlockResolver(conf, registeredExecutorFile), | |
| new NoOpMergedShuffleFileManager(conf, null)); | |
| } |
However, without adding this condition, the result will not change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT And the log will look strange without this condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e3500cf remove the condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait CI
| String dbBackendName = | ||
| conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name()); | ||
| DBBackend dbBackend = DBBackend.byName(dbBackendName); | ||
| db = DBProvider.initDB(dbBackend, this.registeredExecutorFile, CURRENT_VERSION, mapper); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review note: Recovery need not be enabled for node managers - in which case registeredExecutorFile will be null (in addition to tests).
DBProvider.initDB does handle null input though.
So the main change in this file and RemoteBlockPushResolver is moving the log message into if (db != null)
|
Is this still WIP @LuciferYang ? |
|
Removed [WIP], and need @Ngone51 confirm whether there are new comments of #36200. And I have a point to discuss: There are two |
@Ngone51 @mridulm 7a22aeb Move |
|
Do you have time to further review this one? Thanks @mridulm |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of comments - but since this is to mostly address @Ngone51's feedback; would be good if you can review it !
common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java
Show resolved
Hide resolved
| } | ||
| } else { | ||
| null | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will end up duplicating logic between DBProvider and ShuffleTestAccessor - let us consolidate it in DBProvider.
This also impacts the integration test anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let me revert this change first and then see if there is a better way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do some check, I think we can directly use another initDB and delete this one.
| import org.apache.spark.network.yarn.util.HadoopConfigProvider | ||
| import org.apache.spark.tags.ExtendedLevelDBTest | ||
| import org.apache.spark.util.Utils | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| version: StoreVersion, | ||
| mapper: ObjectMapper) | ||
| : ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = { | ||
| val db = DBProvider.initDB(dbBackend, file, version, mapper) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a behavior change for test, right (this ?)
Essentially, YarnShuffleIntegrationSuite would expect it to fail with an exception and not do error handling (which initDB does).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 155 to 170 in 19b1780
| try { | |
| val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }. | |
| collect().toSet | |
| sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) | |
| data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet) | |
| result = "success" | |
| // only one process can open a leveldb file at a time, so we copy the files | |
| if (registeredExecFile != null && execStateCopy != null) { | |
| val dbBackendName = conf.get(SHUFFLE_SERVICE_DB_BACKEND.key) | |
| val dbBackend = DBBackend.byName(dbBackendName) | |
| logWarning(s"Configured ${SHUFFLE_SERVICE_DB_BACKEND.key} as $dbBackendName " + | |
| s"and actually used value ${dbBackend.name()}") | |
| FileUtils.copyDirectory(registeredExecFile, execStateCopy) | |
| assert(!ShuffleTestAccessor | |
| .reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty) | |
| } |
Do you mean the behavior change is execStateCopy exists but open failed? The original method will throw an exception directly, and the new method will return a new instance? It seems that the case just to test whether an existing execStateCopy can be loaded again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the original case does not test the execStateCopy exists but open failed scenario, it seems that there is no change.
For the initDB(dbBackend, file) method:
spark/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java
Lines 88 to 94 in 19b1780
| @VisibleForTesting | |
| static DB initLevelDB(File file) throws IOException { | |
| Options options = new Options(); | |
| options.createIfMissing(true); | |
| JniDBFactory factory = new JniDBFactory(); | |
| return factory.open(file, options); | |
| } |
- if
execStateCopyexists, it will re-open it and load data fromexecStateCopy - else if
execStateCopynot exists, it will open a new db and return empty data due tooptions.createIfMissing(true), then assert failed
For the initDB(dbBackend, dbFile, version, mapper) method:
- if
execStateCopyexists, it will re-open it and load data fromexecStateCopy - else if
execStateCopynot exists, it will open a new db and return empty data, then assert failed
For execStateCopy exists but open failed scenario, use initDB(dbBackend, file) will throw an unhandled exception, use initDB(dbBackend, dbFile, version, mapper) will assert failed. Is this unacceptable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito or @tgravescs should have more context about this, and comment better - looking at the test, I would expect this test code path to additionally test if the DB is missing/invalid/etc and fail in case there is any issues (due to lack of error handling/fallback in LevelDBProvider.initLevelDB) - which changes with in PR, that no longer happens; right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm add SPARK-40364 to tracking this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will give a separately pr to continue to explore the feasibility about this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry didn't get a chance to look before your filed the other issue, happy to look if you make the change under the other issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tgravescs , I will give another pr later ~
|
Run / Base image build failed as follows: friendly ping @Yikun for help, I should not have changed GitHub's configuration |
|
pls rebase this PR, after #37815 merged. Related github incident: https://www.githubstatus.com/incidents/d181frs643d4 |
Thanks ~ |
|
You can retrigger the ci, the issue github already fix the https://www.githubstatus.com/incidents/d181frs643d4 |
|
Merged to master. |
|
Thanks for your review @mridulm @tgravescs ~ |
What changes were proposed in this pull request?
This is a followup of #36200, the main change of this pr as follows:
DBandDBProvideras @Private to make the API intent clearerremovemethod fromDBProviderwitch is unnecessary@OverrideExternalShuffleBlockResolverandRemoteBlockPushResolver, fix related sutesWhy are the changes needed?
Fix new comments after #36200 merged.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Pass All GitHub Actions